package com.trytech.mongoocrawler.web.netty.client;

import com.alibaba.fastjson.JSONObject;
import com.trytech.mongoocrawler.common.transport.protocol.*;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.log4j.Log4j2;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * Created by hp on 2017-7-10.
 */
@Log4j2
public class MonitorTcpClient {
    private static ProtocolFilterChain filterChain;
    private static ConcurrentHashMap<String, LinkedBlockingDeque<AbstractProtocol>> dataMap = new ConcurrentHashMap<String, LinkedBlockingDeque<AbstractProtocol>>();
    private String host;
    private int port;
    private ChannelFuture channelFuture;
    private EventLoopGroup group;
    private NioSocketChannel socketChannel;


    private MonitorTcpClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public static MonitorTcpClient newInstance(String host, int port) {
        MonitorTcpClient nettyTcpClient = new MonitorTcpClient(host, port);
        nettyTcpClient.start();
        return nettyTcpClient;
    }

    public void start() {
        //创建工作线程池
        if (group == null || group.isTerminated() || group.isShutdown()) {
            group = new NioEventLoopGroup();
        }
        try {
            //创建启动器
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.remoteAddress(this.host, this.port).group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        public void initChannel(NioSocketChannel ch) throws Exception {
                            ChannelPipeline channelPipeline = ch.pipeline();
                            channelPipeline.addLast(new ProtocolEncoder());
                            channelPipeline.addLast(new ProtocolDecoder());
                            channelPipeline.addLast(new NettyTcpClientEventHandler());
                        }
                    });
            channelFuture = bootstrap.connect().sync();
            socketChannel = (NioSocketChannel) channelFuture.channel();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public AbstractProtocol sendRequest(CrawlerTransferProtocol protocol) {
        try {
            socketChannel.writeAndFlush(protocol).sync();
            LinkedBlockingDeque<AbstractProtocol> dataQueue = dataMap.get(protocol.getContent().getTraceId());
            if (dataQueue == null) {
                dataQueue = new LinkedBlockingDeque();
                dataMap.put(protocol.getContent().getTraceId(), dataQueue);
            }
            return dataQueue.takeLast();
        } catch (InterruptedException e) {
            return null;
        }
    }

    public void close() {
        if (group != null) {
            // 优雅退出，释放NIO线程组
            group.shutdownGracefully();
        }
    }

    public class NettyTcpClientEventHandler extends SimpleChannelInboundHandler<CrawlerTransferProtocol> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, CrawlerTransferProtocol sourceProtocol) throws Exception {
            LinkedBlockingDeque<AbstractProtocol> dataQueue = dataMap.get(sourceProtocol.getContent().getTraceId());
            dataQueue.putFirst(sourceProtocol.getContent());
            System.out.println(JSONObject.toJSONString(sourceProtocol));
        }
    }
}
